/******************************************************************************
This file is part of AppKit.
Project: appkit
Author : FergusZeng
Email  : cblock@126.com
git	   : https://gitee.com/newgolo/appkit.git
*******************************************************************************
MIT License

Copyright (c) 2022 cblock@126.com

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
******************************************************************************/
#include "appkit/ipc.h"

#include <fcntl.h>
#include <stdlib.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <sys/mount.h>
#include <sys/stat.h>
#include <unistd.h>
#if USE_SYSV
#else
#include <time.h>
#endif

#include "appkit/strutil.h"
#include "appkit/tracer.h"

#define MAX_MESSAGES 200 /* 最大允许的消息条数 */

#define USE_SEMAPHORE 0  // 使用有名信号量来进行共享内存读写互斥,默认使用文件锁

namespace appkit {

MsgQueue::MsgQueue(key_t key) {}

MsgQueue::~MsgQueue() {
#if USE_SYSV
#else
    mq_unlink(CSTR(getMqName(m_key)));
#endif
}

std::string MsgQueue::getMqName(int key) {
#if USE_SYSV
    return "";
#else
    char mqName[32] = {0};
    snprintf(mqName, sizeof(mqName), "/mq_%x", key);
    return std::string(mqName);
#endif
}

bool MsgQueue::initialize() {
#if USE_SYSV
    m_msgID = msgget(m_key, 0666 | IPC_CREAT);
#else
    if (!Directory::exists(POSIX_MQFS)) {
        if (!Directory::createDir(POSIX_MQFS, 0666)) {
            return false;
        }
        // System::execute("mount -t mqueue none "POSIX_MQFS);
        if (mount("none", POSIX_MQFS, "mqueue", 0, nullptr) != 0) {
            TRACE_ERR_CLASS("mount mqueue error!");
            return false;
        }
    }
    mq_unlink(CSTR(getMqName(m_key)));
    int oflags = O_RDWR | O_CREAT | O_EXCL;
    mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
    struct mq_attr attr;
    attr.mq_maxmsg = 10;
    attr.mq_msgsize = MSG_SIZE_MAX + 4;
    m_msgID =
        static_cast<int>(mq_open(CSTR(getMqName(m_key)), oflags, mode, &attr));
#endif
    if (m_msgID < 0) {
        TRACE_ERR_CLASS("MsgQueue(key:0x%x) initialize failed:%s.", m_key,
                        ERRSTR);
        return false;
    }
    return true;
}

int MsgQueue::sendMsg(const QueueMsgBody& msg) {
    int rc = 0;
    if (msg.m_msgType <= 0) {
        TRACE_ERR_CLASS("msg type must be greater than zero!");
        return RC_ERROR;
    }
    if (msg.m_dataLen >= MSG_SIZE_MAX) {
        TRACE_ERR_CLASS("msg data len must be less than %d!", MSG_SIZE_MAX);
        return RC_ERROR;
    }
#if USE_SYSV
    rc = msgsnd(m_msgID,
                reinterpret_cast<void*>(const_cast<QueueMsgBody*>(&msg)),
                msg.m_dataLen + sizeof(msg.m_dataLen), 0);
#else
    struct timespec timeout;
    timeout.tv_sec = 0;
    timeout.tv_nsec = 10;
    rc = mq_timedsend((mqd_t)m_msgID, (const char*)(&msg), MSG_SIZE_MAX + 4, 0,
                      &timeout);
#endif
    if (rc < 0) {
        // TRACE_ERR_CLASS("MsgQueue(key:0x%x) send msg(type:%d,datalen:%d)
        // failed.",m_key,msg.m_msgType,msg.m_dataLen);
        return RC_ERROR;
    }
    return RC_OK;
}

int MsgQueue::recvMsg(const QueueMsgBody& msg, int msgType) {
    int rc = 0;
#if USE_SYSV
    rc = msgrcv(m_msgID,
                reinterpret_cast<void*>(const_cast<QueueMsgBody*>(&msg)),
                MSG_SIZE_MAX, (long)msgType, /* NOLINT */
                IPC_NOWAIT | MSG_NOERROR);
#else
    struct timespec timeout;
    timeout.tv_sec = 0;
    timeout.tv_nsec = 10;
    rc = mq_timedreceive(
        (mqd_t)m_msgID,
        reinterpret_cast<char*>(const_cast<QueueMsgBody*>(&msg)),
        MSG_SIZE_MAX + 4, 0, &timeout);
#endif
    if (rc < 0) {
        // TRACE_ERR_CLASS("MsgQueue(key:0x%x) recieve message(%d)
        // failed(%s).",m_key, msgType,ERRSTR);
        return RC_ERROR;
    }
    return RC_OK;
}

int MsgQueue::clearMsg(int msgType) {
    QueueMsgBody msg;
    while (recvMsg(msg, msgType) == RC_OK) {
        continue;
    }
    return RC_OK;
}

MsgQueueFactory::MsgQueueFactory() {}

MsgQueueFactory::~MsgQueueFactory() {}

std::shared_ptr<MsgQueue> MsgQueueFactory::getMsgQueue(key_t key) {
    std::lock_guard<std::mutex> lock(m_mutex);
    auto iter = m_msgQueueMap.find(key);
    if (iter != m_msgQueueMap.end()) {
        return iter->second;
    } else {
        auto pMsg = std::make_shared<MsgQueue>(key);
        if (!pMsg) {
            return nullptr;
        }

        if (pMsg->initialize()) {
            m_msgQueueMap.insert(std::make_pair(key, pMsg));
            return pMsg;
        } else {
            TRACE_ERR_CLASS("create MsgQueue[key=0x%x] failed!", key);
            return nullptr;
        }
    }
}

MemPool::MemPool() {}

MemPool::~MemPool() { free(m_poolStart); }

bool MemPool::init(int blockNum, int blockSize, void* memStart) {
    std::lock_guard<std::mutex> lock(m_poolMutex);
    if (blockNum <= 0 || blockSize <= 0) {
        return false;
    }
    m_blockNum = blockNum;
    m_blockSize = blockSize;
    if (memStart) {
        m_poolStart = memStart;
    } else {
        m_poolStart = calloc(m_blockNum, m_blockSize);
        if (!m_poolStart) {
            return false;
        }
    }
    for (auto i = 0; i < m_blockNum; i++) {
        auto memBlock = std::make_unique<MemBlock>();
        memBlock->isUsed = false;
        memBlock->memStart = m_poolStart + (i * m_blockSize);
        m_memBlocks.push_back(std::move(memBlock));
    }
    return true;
}

void* MemPool::getMemory(const std::string& memoryName, int memorySize) {
    std::lock_guard<std::mutex> lock(m_poolMutex);
    if (memorySize <= 0 || m_blockSize <= 0 || m_blockNum <= 0 ||
        memoryName.empty()) {
        return nullptr;
    }
    int blocks = (memorySize + m_blockSize - 1) / m_blockSize;
    int num = 0;
    for (auto i = 0; i < m_blockNum; i++) {
        if (!m_memBlocks[i]->isUsed) {
            num++;
            if (num == blocks) {
                for (auto j = 0; j < blocks; j++) {
                    m_memBlocks[i - j]->isUsed = true;
                    m_memBlocks[i - j]->memName = memoryName;
                }
                return m_memBlocks[i - blocks + 1]->memStart;
            }
        } else {
            num = 0;
        }
    }
    return nullptr;
}

bool MemPool::putMemory(const std::string& memoryName) {
    std::lock_guard<std::mutex> lock(m_poolMutex);
    if (m_blockSize <= 0 || m_blockNum <= 0) {
        return false;
    }
    for (auto i = 0; i < m_blockNum; i++) {
        if (m_memBlocks[i]->memName == memoryName && m_memBlocks[i]->isUsed) {
            m_memBlocks[i]->memName = "";
            m_memBlocks[i]->isUsed = false;
        }
    }
    return true;
}

void MemPool::showMemory() {
    for (auto i = 0; i < m_blockNum; i++) {
        TRACE_DBG("name:%s,use:%d,block:%d,addr:%08x",
                  CSTR(m_memBlocks[i]->memName), m_memBlocks[i]->isUsed, i,
                  m_memBlocks[i]->memStart);
    }
}

Fifo::Fifo() {}
Fifo::~Fifo() {}
bool Fifo::open(const std::string& devName, int ioMode) {
    if (devName.empty()) {
        TRACE_ERR_CLASS("fifo name is empty!");
        return false;
    }
    bool exist = false;
    if (0 == access(devName.data(), F_OK)) {
        struct stat fileStat;
        if (0 == lstat(devName.data(), &fileStat)) {
            if (!S_ISDIR(fileStat.st_mode)) {
                exist = true;
            }
        }
    }
    if (!exist) {
        int rc = mkfifo(devName.data(), S_IFIFO | 0666);
        if (rc < 0) {
            TRACE_ERR_CLASS("mkfifo error:%s.", ERRSTR);
            return false;
        }
    }

    if (m_fd >= 0) {
        TRACE_ERR_CLASS("Device is already opened!");
        return false;
    }

    switch (ioMode) {
        case IO_MODE_RD_ONLY:
            m_fd = ::open(devName.data(), O_RDONLY | O_NONBLOCK);
            break;
        case IO_MODE_WR_ONLY:
            m_fd = ::open(devName.data(), O_WRONLY | O_NONBLOCK);
            break;
        case IO_MODE_RDWR_ONLY:
            m_fd = ::open(devName.data(), O_RDWR | O_NONBLOCK);
            break;
        default:
            TRACE_ERR_CLASS("Unsupport IO Mode: %d", ioMode);
            return false;
    }

    if (m_fd < 0) {
        TRACE_ERR_CLASS("Open %s error: %s", devName.data(), ERRSTR);
        return false;
    }
    m_name = devName;
    m_openMode = ioMode;
    return true;
}

MemShared::MemShared() {}

MemShared::~MemShared() {
    /* shm不需要自动关闭 */
    if (m_type != SHM_MMAP) {
        close();
    }
}

bool MemShared::open(std::string name, int size, int mode,
                     MemShared::Type type) {
    if (name.empty()) {
        TRACE_ERR_CLASS("not specify shared name!");
        return false;
    }
    if (m_fd >= 0) {
        TRACE_ERR_CLASS("shared already exists!");
        return false;
    }

    int flag, mmapProt;
    switch (mode) {
        case IO_MODE_RD_ONLY: /* 只读 */
            flag = O_RDWR;
            mmapProt = PROT_READ;
            break;
        case IO_MODE_RDWR_ONLY: /* 只读写,不创建 */
            flag = O_RDWR;
            mmapProt = PROT_READ | PROT_WRITE;
            break;
        case IO_MODE_RDWR_ORNEW: /* 可读写,没有则创建 */
            flag = O_RDWR | O_CREAT;
            mmapProt = PROT_READ | PROT_WRITE;
            break;
        case IO_MODE_REWR_ORNEW: /* 重新写,没有则创建 */
            flag = O_RDWR | O_CREAT | O_TRUNC;
            mmapProt = PROT_READ | PROT_WRITE;
            break;
        default:
            TRACE_ERR_CLASS("Unsupport IO Mode: %d", mode);
            return false;
    }

    FilePath filePath(name);
    auto fileDir = filePath.dirName();
    if (!Directory::exists(fileDir)) {
        if (!Directory::createDir(fileDir, 0777, true)) {
            TRACE_ERR_CLASS("create shared dir[%s] error!", CSTR(fileDir));
            return false;
        }
    }

    /* open中的第三个参数只有在flag包含O_CREAT时有效 */
    if (type == SHM_MMAP) {
        m_fd = ::shm_open(CSTR(name), flag, 0664);
    } else {
        m_fd = ::open(CSTR(name), flag, 0664);
    }
    if (m_fd < 0) {
        TRACE_ERR_CLASS("open shared[%s] error: %s", CSTR(name), ERRSTR);
        return false;
    }
    m_type = type;
    /* 设置共享内存大小 */
    if (size > 0 && ftruncate(m_fd, size) < 0) {
        TRACE_ERR_CLASS("set shared[%s] error: %s!", CSTR(name), ERRSTR);
        close();
        return false;
    }
    /* 映射内存 */
    m_memStart = mmap(nullptr, size, mmapProt, MAP_SHARED, m_fd, 0);
    if (MAP_FAILED == m_memStart) {
        TRACE_ERR_CLASS("map shared[%s] error: %s!", CSTR(name), ERRSTR);
        m_memStart = nullptr;
        close();
        return false;
    }

#if USE_SEMAPHORE
#if 0
    auto semName = StrUtil::replaceString(name, "/", "$");
    semName = "/" + semName;
    m_procSem = std::make_shared<Semaphore>();
#else
    auto semName = name;
    m_procSem = std::make_shared<SemaphoreV>();
#endif
    // TRACE_DBG_CLASS("sem name: %s", semName.data());
    if (!m_procSem->open(semName, 1)) {
        TRACE_ERR_CLASS("open semaphonre[%s] error: %s", CSTR(semName), ERRSTR);
        return false;
    }
#endif

    m_size = this->size();
    m_name = name;
    return true;
}

bool MemShared::close() {
    if (m_name.empty() || m_fd < 0) {
        return true;
    }
    if (m_memStart && munmap(m_memStart, m_size) != 0) {
        TRACE_ERR_CLASS("unmap shared[%s], error:%s", CSTR(m_name), ERRSTR);
        return false;
    }
    m_memStart = nullptr;
    if (m_type == SHM_MMAP) {
        if (shm_unlink(CSTR(m_name)) != 0) {
            TRACE_ERR_CLASS("shared[%s] unlink error: %s!", CSTR(m_name),
                            ERRSTR);
            return false;
        }
    } else {
        if ((m_fd > 0) && (::close(m_fd) != 0)) {
            TRACE_ERR_CLASS("shared[%s] close error: %s!", CSTR(m_name),
                            ERRSTR);
            return false;
        }
    }
    m_fd = -1;
#if USE_SEMAPHORE
    if (m_procSem) {
        m_procSem->close();
        m_procSem = nullptr;
    }
#endif
    return true;
}

int MemShared::putData(void* data, int size) {
#if USE_SEMAPHORE
    if (!data || size < 1 || !m_memStart || !m_procSem) {
        return -1;
    }
    m_procSem->wait();
#else
    if (!data || size < 1 || !m_memStart) {
        return -1;
    }
    flock(m_fd, LOCK_EX);  // 排他锁
#endif
#if 0
    TRACE_DBG("locked------");
    Thread::msleep(5000);
#endif
    auto len = CLIP(1, size, m_size);
    memcpy(m_memStart, data, len);
    if (m_type == MemShared::Type::FILE_MMAP) {
        if (msync(m_memStart, m_size, MS_SYNC) < 0) {
#if USE_SEMAPHORE
            m_procSem->post();
#else
            flock(m_fd, LOCK_UN);
#endif
            return -1;
        }
    }
#if USE_SEMAPHORE
    m_procSem->post();
#else
    flock(m_fd, LOCK_UN);
#endif
#if 0
    TRACE_DBG("release------");
    Thread::msleep(5000);
#endif
    return len;
}

int MemShared::getData(void* buffer, int size) {
#if USE_SEMAPHORE
    if (!buffer || size < 1 || !m_memStart || !m_procSem) {
        return -1;
    }
    m_procSem->wait();
#else
    if (!buffer || size < 1 || !m_memStart) {
        return -1;
    }
    flock(m_fd, LOCK_SH);  // 共享锁
#endif
    auto len = CLIP(1, size, m_size);
    memcpy(buffer, m_memStart, len);

#if USE_SEMAPHORE
    m_procSem->post();
#else
    flock(m_fd, LOCK_UN);
#endif

    return len;
}

int MemShared::size() {
    struct stat fdStat;
    if (m_fd < 0 || fstat(m_fd, &fdStat) < 0) {
        return 0;
    }
    return fdStat.st_size;
}

void* MemShared::data() { return m_memStart; }
}  // namespace appkit
